1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.mockito.Matchers.any;
19 import static org.mockito.Mockito.never;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22
23 import java.util.Arrays;
24
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.MockitoAnnotations;
29
30 import rx.Observable;
31 import rx.Observer;
32 import rx.Subscriber;
33 import rx.functions.Action1;
34 import rx.functions.Func1;
35 import rx.functions.Func2;
36 import rx.subjects.PublishSubject;
37
38 public class OnSubscribeGroupJoinTest {
39 @Mock
40 Observer<Object> observer;
41
42 Func2<Integer, Integer, Integer> add = new Func2<Integer, Integer, Integer>() {
43 @Override
44 public Integer call(Integer t1, Integer t2) {
45 return t1 + t2;
46 }
47 };
48
49 <T> Func1<Integer, Observable<T>> just(final Observable<T> observable) {
50 return new Func1<Integer, Observable<T>>() {
51 @Override
52 public Observable<T> call(Integer t1) {
53 return observable;
54 }
55 };
56 }
57
58 <T, R> Func1<T, Observable<R>> just2(final Observable<R> observable) {
59 return new Func1<T, Observable<R>>() {
60 @Override
61 public Observable<R> call(T t1) {
62 return observable;
63 }
64 };
65 }
66
67 Func2<Integer, Observable<Integer>, Observable<Integer>> add2 = new Func2<Integer, Observable<Integer>, Observable<Integer>>() {
68 @Override
69 public Observable<Integer> call(final Integer leftValue, Observable<Integer> rightValues) {
70 return rightValues.map(new Func1<Integer, Integer>() {
71 @Override
72 public Integer call(Integer rightValue) {
73 return add.call(leftValue, rightValue);
74 }
75 });
76 }
77
78 };
79
80 @Before
81 public void before() {
82 MockitoAnnotations.initMocks(this);
83 }
84
85 @Test
86 public void behaveAsJoin() {
87 PublishSubject<Integer> source1 = PublishSubject.create();
88 PublishSubject<Integer> source2 = PublishSubject.create();
89
90 Observable<Integer> m = Observable.merge(source1.groupJoin(source2,
91 just(Observable.never()),
92 just(Observable.never()), add2));
93
94 m.subscribe(observer);
95
96 source1.onNext(1);
97 source1.onNext(2);
98 source1.onNext(4);
99
100 source2.onNext(16);
101 source2.onNext(32);
102 source2.onNext(64);
103
104 source1.onCompleted();
105 source2.onCompleted();
106
107 verify(observer, times(1)).onNext(17);
108 verify(observer, times(1)).onNext(18);
109 verify(observer, times(1)).onNext(20);
110 verify(observer, times(1)).onNext(33);
111 verify(observer, times(1)).onNext(34);
112 verify(observer, times(1)).onNext(36);
113 verify(observer, times(1)).onNext(65);
114 verify(observer, times(1)).onNext(66);
115 verify(observer, times(1)).onNext(68);
116
117 verify(observer, times(1)).onCompleted();
118 verify(observer, never()).onError(any(Throwable.class));
119 }
120
121 class Person {
122 final int id;
123 final String name;
124
125 public Person(int id, String name) {
126 this.id = id;
127 this.name = name;
128 }
129 }
130
131 class PersonFruit {
132 final int personId;
133 final String fruit;
134
135 public PersonFruit(int personId, String fruit) {
136 this.personId = personId;
137 this.fruit = fruit;
138 }
139 }
140
141 class PPF {
142 final Person person;
143 final Observable<PersonFruit> fruits;
144
145 public PPF(Person person, Observable<PersonFruit> fruits) {
146 this.person = person;
147 this.fruits = fruits;
148 }
149 }
150
151 @Test
152 public void normal1() {
153 Observable<Person> source1 = Observable.from(Arrays.asList(
154 new Person(1, "Joe"),
155 new Person(2, "Mike"),
156 new Person(3, "Charlie")
157 ));
158
159 Observable<PersonFruit> source2 = Observable.from(Arrays.asList(
160 new PersonFruit(1, "Strawberry"),
161 new PersonFruit(1, "Apple"),
162 new PersonFruit(3, "Peach")
163 ));
164
165 Observable<PPF> q = source1.groupJoin(
166 source2,
167 just2(Observable.<Object> never()),
168 just2(Observable.<Object> never()),
169 new Func2<Person, Observable<PersonFruit>, PPF>() {
170 @Override
171 public PPF call(Person t1, Observable<PersonFruit> t2) {
172 return new PPF(t1, t2);
173 }
174 });
175
176 q.subscribe(
177 new Subscriber<PPF>() {
178 @Override
179 public void onNext(final PPF ppf) {
180 ppf.fruits.filter(new Func1<PersonFruit, Boolean>() {
181 @Override
182 public Boolean call(PersonFruit t1) {
183 return ppf.person.id == t1.personId;
184 }
185 }).subscribe(new Action1<PersonFruit>() {
186 @Override
187 public void call(PersonFruit t1) {
188 observer.onNext(Arrays.asList(ppf.person.name, t1.fruit));
189 }
190 });
191 }
192
193 @Override
194 public void onError(Throwable e) {
195 observer.onError(e);
196 }
197
198 @Override
199 public void onCompleted() {
200 observer.onCompleted();
201 }
202
203 }
204 );
205
206 verify(observer, times(1)).onNext(Arrays.asList("Joe", "Strawberry"));
207 verify(observer, times(1)).onNext(Arrays.asList("Joe", "Apple"));
208 verify(observer, times(1)).onNext(Arrays.asList("Charlie", "Peach"));
209
210 verify(observer, times(1)).onCompleted();
211 verify(observer, never()).onError(any(Throwable.class));
212 }
213
214 @Test
215 public void leftThrows() {
216 PublishSubject<Integer> source1 = PublishSubject.create();
217 PublishSubject<Integer> source2 = PublishSubject.create();
218
219 Observable<Observable<Integer>> m = source1.groupJoin(source2,
220 just(Observable.never()),
221 just(Observable.never()), add2);
222
223 m.subscribe(observer);
224
225 source2.onNext(1);
226 source1.onError(new RuntimeException("Forced failure"));
227
228 verify(observer, times(1)).onError(any(Throwable.class));
229 verify(observer, never()).onCompleted();
230 verify(observer, never()).onNext(any());
231 }
232
233 @Test
234 public void rightThrows() {
235 PublishSubject<Integer> source1 = PublishSubject.create();
236 PublishSubject<Integer> source2 = PublishSubject.create();
237
238 Observable<Observable<Integer>> m = source1.groupJoin(source2,
239 just(Observable.never()),
240 just(Observable.never()), add2);
241
242 m.subscribe(observer);
243
244 source1.onNext(1);
245 source2.onError(new RuntimeException("Forced failure"));
246
247 verify(observer, times(1)).onNext(any(Observable.class));
248 verify(observer, times(1)).onError(any(Throwable.class));
249 verify(observer, never()).onCompleted();
250 }
251
252 @Test
253 public void leftDurationThrows() {
254 PublishSubject<Integer> source1 = PublishSubject.create();
255 PublishSubject<Integer> source2 = PublishSubject.create();
256
257 Observable<Integer> duration1 = Observable.<Integer> error(new RuntimeException("Forced failure"));
258
259 Observable<Observable<Integer>> m = source1.groupJoin(source2,
260 just(duration1),
261 just(Observable.never()), add2);
262 m.subscribe(observer);
263
264 source1.onNext(1);
265
266 verify(observer, times(1)).onError(any(Throwable.class));
267 verify(observer, never()).onCompleted();
268 verify(observer, never()).onNext(any());
269 }
270
271 @Test
272 public void rightDurationThrows() {
273 PublishSubject<Integer> source1 = PublishSubject.create();
274 PublishSubject<Integer> source2 = PublishSubject.create();
275
276 Observable<Integer> duration1 = Observable.<Integer> error(new RuntimeException("Forced failure"));
277
278 Observable<Observable<Integer>> m = source1.groupJoin(source2,
279 just(Observable.never()),
280 just(duration1), add2);
281 m.subscribe(observer);
282
283 source2.onNext(1);
284
285 verify(observer, times(1)).onError(any(Throwable.class));
286 verify(observer, never()).onCompleted();
287 verify(observer, never()).onNext(any());
288 }
289
290 @Test
291 public void leftDurationSelectorThrows() {
292 PublishSubject<Integer> source1 = PublishSubject.create();
293 PublishSubject<Integer> source2 = PublishSubject.create();
294
295 Func1<Integer, Observable<Integer>> fail = new Func1<Integer, Observable<Integer>>() {
296 @Override
297 public Observable<Integer> call(Integer t1) {
298 throw new RuntimeException("Forced failure");
299 }
300 };
301
302 Observable<Observable<Integer>> m = source1.groupJoin(source2,
303 fail,
304 just(Observable.never()), add2);
305 m.subscribe(observer);
306
307 source1.onNext(1);
308
309 verify(observer, times(1)).onError(any(Throwable.class));
310 verify(observer, never()).onCompleted();
311 verify(observer, never()).onNext(any());
312 }
313
314 @Test
315 public void rightDurationSelectorThrows() {
316 PublishSubject<Integer> source1 = PublishSubject.create();
317 PublishSubject<Integer> source2 = PublishSubject.create();
318
319 Func1<Integer, Observable<Integer>> fail = new Func1<Integer, Observable<Integer>>() {
320 @Override
321 public Observable<Integer> call(Integer t1) {
322 throw new RuntimeException("Forced failure");
323 }
324 };
325
326 Observable<Observable<Integer>> m = source1.groupJoin(source2,
327 just(Observable.never()),
328 fail, add2);
329 m.subscribe(observer);
330
331 source2.onNext(1);
332
333 verify(observer, times(1)).onError(any(Throwable.class));
334 verify(observer, never()).onCompleted();
335 verify(observer, never()).onNext(any());
336 }
337
338 @Test
339 public void resultSelectorThrows() {
340 PublishSubject<Integer> source1 = PublishSubject.create();
341 PublishSubject<Integer> source2 = PublishSubject.create();
342
343 Func2<Integer, Observable<Integer>, Integer> fail = new Func2<Integer, Observable<Integer>, Integer>() {
344 @Override
345 public Integer call(Integer t1, Observable<Integer> t2) {
346 throw new RuntimeException("Forced failure");
347 }
348 };
349
350 Observable<Integer> m = source1.groupJoin(source2,
351 just(Observable.never()),
352 just(Observable.never()), fail);
353 m.subscribe(observer);
354
355 source1.onNext(1);
356 source2.onNext(2);
357
358 verify(observer, times(1)).onError(any(Throwable.class));
359 verify(observer, never()).onCompleted();
360 verify(observer, never()).onNext(any());
361 }
362 }